Example for using a classifier for these 4 signals.
import pandas as pd
from datetime import datetime
import tensorflow as tf
from tensorflow.keras import Model
from timeit import default_timer as timer
from numpy import load, save
import numpy as np
import time
import ast
import sys
from datetime import datetime
sample_rate = 250000
signals_count = 1
capture_size = 2048
train_size = 800
test_size = 200
use_preprocessed_data = False
dir_base_model = ""
#model_name = "clasifier-2"
model_name = "clasifier-2-1-new"
print(tf.__version__)
dt = np.dtype([('i', 'f4'), ('q', 'f4')])
def load_data_file(file, label):
data = np.fromfile(dir_base_model + file, dtype=dt)
df = pd.DataFrame(data)
df["label"] = label
npArray = np.array(range(len(df)), dtype='f4', copy=True, order='K', subok=False, ndmin=0)
df["time"] = npArray / sample_rate
return df
# Labels: BLANK, FM, GSM, CARRIER
#dfa1 = load_data_file("train-noise-1g1-40.dat", "BLANK")
dfb2 = load_data_file("train-fm-95500000-40.dat", "FM")
dfb3 = load_data_file("train-fm-95300000-40.dat", "FM")
#dfc1 = load_data_file("train-gsm-891101000-40.dat", "GSM")
#dfd1 = load_data_file("train-carrier-950380000-40.dat", "CARRIER")
frames = [dfb2, dfb3]
result = pd.concat(frames)
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly
fig = make_subplots(rows=1, cols=4)
font_options = dict(family="Courier New, monospace", size=18, color="#7f7f7f")
#fig.add_trace(go.Scatter(x=dfa1[0:capture_size]['time'], y=dfa1[0:capture_size]['i'], mode='lines', name='i'), row=1, col=1)
#fig.add_trace(go.Scatter(x=dfa1[0:capture_size]['time'], y=dfa1[0:capture_size]['q'], mode='lines', name='q'), row=1, col=1)
#fig.add_trace(go.Scatter(x=dfb1[0:capture_size]['time'], y=dfb1[0:capture_size]['i'], mode='lines', name='i'), row=1, col=1)
#fig.add_trace(go.Scatter(x=dfb1[0:capture_size]['time'], y=dfb1[0:capture_size]['q'], mode='lines', name='q'), row=1, col=1)
#fig.add_trace(go.Scatter(x=dfb2[0:capture_size]['time'], y=dfb2[0:capture_size]['i'], mode='lines', name='i'), row=1, col=2)
#fig.add_trace(go.Scatter(x=dfb2[0:capture_size]['time'], y=dfb2[0:capture_size]['q'], mode='lines', name='q'), row=1, col=2)
fig.add_trace(go.Scatter(x=dfb3[0:capture_size]['time'], y=dfb3[0:capture_size]['i'], mode='lines', name='i'), row=1, col=3)
fig.add_trace(go.Scatter(x=dfb3[0:capture_size]['time'], y=dfb3[0:capture_size]['q'], mode='lines', name='q'), row=1, col=3)
#fig.add_trace(go.Scatter(x=dfc1[0:capture_size]['time'], y=dfc1[0:capture_size]['i'], mode='lines', name='i'), row=1, col=3)
#fig.add_trace(go.Scatter(x=dfc1[0:capture_size]['time'], y=dfc1[0:capture_size]['q'], mode='lines', name='q'), row=1, col=3)
#fig.add_trace(go.Scatter(x=dfd1[0:capture_size]['time'], y=dfd1[0:capture_size]['i'], mode='lines', name='i'), row=1, col=4)
#fig.add_trace(go.Scatter(x=dfd1[0:capture_size]['time'], y=dfd1[0:capture_size]['q'], mode='lines', name='q'), row=1, col=4)
fig.update_layout(height=400, width=800, title="NOISE - FM - GSM - CARRIER", xaxis_title="time", yaxis_title="V", font=font_options)
fig.show()
def get_signal_dataframes(df):
interleave = 5
dx = []
dy = []
tx = []
ty = []
# Generate train dataset
dfax = df[['i', 'q']]
dfay = df[['label']]
# #NOTE: DATA MANIPULATION: Amplificate values!
#dfax = dfax #* 10000.0
dfax = dfax * 10000.0
# Offset to separate train from test datain df
offset = 20480
for i in range(0, train_size):
newx = dfax[interleave * i + offset:interleave * i + capture_size + offset]
dx.append(newx)
newy = dfay[interleave * i + offset:interleave * i + capture_size + offset]
dy.append(newy)
# Offset to separate train from test datain df
offset = train_size * interleave + capture_size + 20480
# Generate test dataset
for i in range(0, test_size):
newx = dfax[interleave * i + offset:interleave * i + capture_size + offset]
tx.append(newx)
newy = dfay[interleave * i + offset:interleave * i + capture_size + offset]
ty.append(newy)
# Convert lists to dataframes
train_x = pd.DataFrame(dx)
train_y = pd.DataFrame(dy)
test_x = pd.DataFrame(tx)
test_y = pd.DataFrame(ty)
return train_x, train_y, test_x, test_y
def add_dataframe_for_signal(df, train_x, train_y, test_x, test_y):
x, y, tx, ty = get_signal_dataframes(df)
train_x.append(x)
train_y.append(y)
test_x.append(tx)
test_y.append(ty)
train_x = []
train_y = []
test_x = []
test_y = []
# Add different data set samples
#add_dataframe_for_signal(dfa1, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfa2, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfa3, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb1, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb2, train_x, train_y, test_x, test_y)
add_dataframe_for_signal(dfb3, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb4, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb5, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb6, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfb7, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfc1, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfc2, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfd1, train_x, train_y, test_x, test_y)
#add_dataframe_for_signal(dfd2, train_x, train_y, test_x, test_y)
# Build dataframes
train_x = pd.concat(train_x)
train_y = pd.concat(train_y)
test_x = pd.concat(test_x)
test_y = pd.concat(test_y)
# Shuffle data
permutations = np.random.permutation(len(train_y))
train_x_pre = train_x.iloc[permutations]
train_y_pre = train_y.iloc[permutations]
permutations = np.random.permutation(len(test_x))
test_x_pre = test_x.iloc[permutations]
test_y_pre = test_y.iloc[permutations]
def create_np_array_x(d, size):
new = np.ndarray(shape=(signals_count * size, 2, capture_size), dtype=float)
# train or test size i
for i in range(len(d.values)):
# 2048 j
for j in range(len((d.values)[i][0])):
new[i][0][j] = ((d.values)[i])[0].values[j][0]
new[i][1][j] = ((d.values)[i])[0].values[j][1]
return new
def create_np_array_y(d, size):
new = np.ndarray(shape=(signals_count * size), dtype=np.int16)
# train or test size i
for i in range(len(d.values)):
# just check the first value
result = ((d.values)[i])[0].values[0][0]
if (result == 'FM'):
new[i] = 1
elif (result == 'GSM'):
new[i] = 2
elif (result == 'CARRIER'):
new[i] = 3
elif (result == 'BLANK'):
new[i] = 0
else:
error()
return new
def generate_train_and_test_data():
train_x = create_np_array_x(train_x_pre, train_size)
train_y = create_np_array_y(train_y_pre, train_size)
test_x = create_np_array_x(test_x_pre, test_size)
test_y = create_np_array_y(test_y_pre, test_size)
return train_x, train_y, test_x, test_y
start = timer()
train_x, train_y, test_x, test_y = generate_train_and_test_data()
save(dir_base_model + 'usage_train_x.npy', train_x)
save(dir_base_model + 'usage_train_y.npy', train_y)
save(dir_base_model + 'usage_test_x.npy', test_x)
save(dir_base_model + 'usage_test_y.npy', test_y)
print("Preprocessed data in: " + str(timer()-start))
# Prepare one line of data to predict
element = 0
samples_to_show = 200
td = np.ndarray(shape=(samples_to_show, 2, capture_size), dtype=float)
ld = np.ndarray(shape=(samples_to_show), dtype=int)
for j in range(samples_to_show):
for i in range(capture_size):
td[j][0][i] = test_x[element+j][0][i]
td[j][1][i] = test_x[element+j][1][i]
ld[j] = test_y[element+j]
def label_from_column(max_column):
if (max_column==1):
return "FM"
if (max_column==2):
return "GSM"
if (max_column==3):
return "Carrier"
if (max_column==0):
return "Noise"
return "Error"
def load_last_model():
model = tf.keras.models.load_model(dir_base_model + model_name + ".model")
model.load_weights(dir_base_model + model_name + ".weights")
model.summary()
return model
model = load_last_model()
start = timer()
predictions = model.predict(td)
end = timer()
print("5 predictions, took: " + str(end - start))
# Build predicted values
listx = []
listx_numeric = []
for i in range(len(predictions)):
result = np.argmax(predictions[i], axis=None)
listx_numeric.append(str(result))
def test_function_printed(df, label, i, j, fig):
fig.add_trace(go.Scatter(x=dfb3[0:capture_size]['time'], y=df[0], mode='lines', name='i'), row=i, col=j)
fig.add_trace(go.Scatter(x=dfb3[0:capture_size]['time'], y=df[1], mode='lines', name='q'), row=i, col=j)
fig = make_subplots(rows=2, cols=4)
i = 0
test_function_printed(td[i], ld[i], 1, 1, fig)
i = 1
test_function_printed(td[i], ld[i], 1, 2, fig)
i = 2
test_function_printed(td[i], ld[i], 1, 3, fig)
i = 3
test_function_printed(td[i], ld[i], 2, 1, fig)
i = 4
test_function_printed(td[i], ld[i], 2, 2, fig)
i = 5
test_function_printed(td[i], ld[i], 2, 3, fig)
i = 6
test_function_printed(td[i], ld[i], 1, 4, fig)
i = 7
test_function_printed(td[i], ld[i], 2, 4, fig)
fig.update_layout(
title = "This should be " + str(ld[0]) + ', ' + str(ld[1]) + ', ' + str(ld[2]) + ', ' + str(ld[3]) + ', ' + str(ld[4]),
xaxis_title = "time", yaxis_title="value", height=400, width=800, font=dict(family="Courier New, monospace", size=18, color="#7f7f7f")
)
fig.show()
from sklearn import metrics
# Prepare predicted values
y_pred = []
for i in ld:
y_pred.append(str(i))
# Actual values
y_act = list(listx_numeric)
print("Predicted " + str(y_pred))
print("Real " + str(y_act))
# Printing the confusion matrix
# The columns will show the instances predicted for each label,
# and the rows will show the actual number of instances for each label.
confusion_matrix = metrics.confusion_matrix(y_act, y_pred, labels=["0", "1", "2", "3"])
# Printing the precision and recall, among other metrics
print(metrics.classification_report(y_act, y_pred, labels=["0", "1", "2", "3"]))
import plotly.figure_factory as ff
z = [l.tolist() for l in list(confusion_matrix)]
x = ["NOISE", "FM", "GSM", "Carrier"]
y = ["NOISE", "FM", "GSM", "Carrier"]
# change each element of z to type string for annotations
z_text = [[str(y) for y in x] for x in z]
# set up figure
fig = ff.create_annotated_heatmap(z, x=x, y=y, annotation_text=z_text, colorscale='Blues')
# add title
fig.update_layout(title_text='<i><b>Confusion matrix</b></i>')
#, xaxis = dict(title='x'), #yaxis = dict(title='x'))
# add custom xaxis title
fig.add_annotation(dict(font=dict(color="black",size=14),
x=0.5, y=-0.15, showarrow=False, text="Predicted value", xref="paper", yref="paper"))
# add custom yaxis title
fig.add_annotation(dict(font=dict(color="black",size=14),
x=-0.35, y=0.5, showarrow=False, text="Real value", textangle=-90, xref="paper",
yref="paper"))
# adjust margins to make room for yaxis title
fig.update_layout(margin=dict(t=50, l=200))
fig.show()
Pobre clasificacion, pero: puede asumirse, que cuando es carrier y FM es FM. Esto es porque a veces una FM puede pareer un carrier y estaria bien considerarlo asi.